// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::Context;
use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
use risingwave_common::id::JobId;
use risingwave_meta_model::ActorId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::id::SourceId;
use risingwave_pb::stream_service::barrier_complete_response::{
    PbListFinishedSource, PbLoadFinishedSource,
};
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_rpc_client::StreamingControlHandle;

use crate::MetaResult;
use crate::barrier::command::CommandContext;
use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
use crate::barrier::progress::TrackingJob;
use crate::barrier::schedule::MarkReadyOptions;
use crate::barrier::{
    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
    Scheduled,
};
use crate::hummock::CommitEpochInfo;
use crate::manager::LocalNotification;
use crate::model::FragmentDownstreamRelation;
use crate::stream::{SourceChange, SplitState};

impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
    #[await_tree::instrument]
    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
        self.hummock_manager.commit_epoch(commit_info).await?;
        Ok(self.hummock_manager.get_version_stats().await)
    }

    #[await_tree::instrument("next_scheduled_barrier")]
    async fn next_scheduled(&self) -> Scheduled {
        self.scheduled_barriers.next_scheduled().await
    }

    fn abort_and_mark_blocked(
        &self,
        database_id: Option<DatabaseId>,
        recovery_reason: RecoveryReason,
    ) {
        if database_id.is_none() {
            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
        }

        // Mark blocked and abort buffered schedules, they might be dirty already.
        self.scheduled_barriers
            .abort_and_mark_blocked(database_id, "cluster is under recovering");
    }

    fn mark_ready(&self, options: MarkReadyOptions) {
        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
        self.scheduled_barriers.mark_ready(options);
        if is_global {
            self.set_status(BarrierManagerStatus::Running);
        }
    }

    #[await_tree::instrument("post_collect_command({command})")]
    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
        command.post_collect(self).await
    }

    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
        self.metadata_manager
            .notify_finish_failed(database_id, err)
            .await
    }

    #[await_tree::instrument("finish_creating_job({job})")]
    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
        let job_id = job.job_id();
        job.finish(&self.metadata_manager, &self.source_manager)
            .await?;
        self.env
            .notification_manager()
            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
        Ok(())
    }

    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
        self.env.cdc_table_backfill_tracker.complete_job(job).await
    }

    #[await_tree::instrument("new_control_stream({})", node.id)]
    async fn new_control_stream(
        &self,
        node: &WorkerNode,
        init_request: &PbInitRequest,
    ) -> MetaResult<StreamingControlHandle> {
        self.new_control_stream_impl(node, init_request).await
    }

    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
        self.reload_runtime_info_impl().await
    }

    async fn reload_database_runtime_info(
        &self,
        database_id: DatabaseId,
    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
        self.reload_database_runtime_info_impl(database_id).await
    }

    async fn handle_list_finished_source_ids(
        &self,
        list_finished: Vec<PbListFinishedSource>,
    ) -> MetaResult<()> {
        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();

        for list_finished in list_finished {
            let table_id = list_finished.table_id;
            let associated_source_id = list_finished.associated_source_id;
            list_finished_info
                .entry((table_id, associated_source_id))
                .or_default()
                .insert(list_finished.reporter_actor_id);
        }

        for ((table_id, associated_source_id), actors) in list_finished_info {
            let allow_yield = self
                .refresh_manager
                .mark_list_stage_finished(table_id, &actors)?;

            if !allow_yield {
                continue;
            }

            // Find the database ID for this table
            let database_id = self
                .metadata_manager
                .catalog_controller
                .get_object_database_id(associated_source_id)
                .await
                .context("Failed to get database id for table")?;

            // Create ListFinish command
            let list_finish_command = Command::ListFinish {
                table_id,
                associated_source_id,
            };

            // Schedule the command through the barrier system without waiting
            self.barrier_scheduler
                .run_command_no_wait(database_id, list_finish_command)
                .context("Failed to schedule ListFinish command")?;

            tracing::info!(
                %table_id,
                %associated_source_id,
                "ListFinish command scheduled successfully"
            );
        }
        Ok(())
    }

    async fn handle_load_finished_source_ids(
        &self,
        load_finished: Vec<PbLoadFinishedSource>,
    ) -> MetaResult<()> {
        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();

        for load_finished in load_finished {
            let table_id = load_finished.table_id;
            let associated_source_id = load_finished.associated_source_id;
            load_finished_info
                .entry((table_id, associated_source_id))
                .or_default()
                .insert(load_finished.reporter_actor_id);
        }

        for ((table_id, associated_source_id), actors) in load_finished_info {
            let allow_yield = self
                .refresh_manager
                .mark_load_stage_finished(table_id, &actors)?;

            if !allow_yield {
                continue;
            }

            // Find the database ID for this table
            let database_id = self
                .metadata_manager
                .catalog_controller
                .get_object_database_id(associated_source_id)
                .await
                .context("Failed to get database id for table")?;

            // Create LoadFinish command
            let load_finish_command = Command::LoadFinish {
                table_id,
                associated_source_id,
            };

            // Schedule the command through the barrier system without waiting
            self.barrier_scheduler
                .run_command_no_wait(database_id, load_finish_command)
                .context("Failed to schedule LoadFinish command")?;

            tracing::info!(
                %table_id,
                %associated_source_id,
                "LoadFinish command scheduled successfully"
            );
        }

        Ok(())
    }

    async fn handle_refresh_finished_table_ids(
        &self,
        refresh_finished_table_job_ids: Vec<JobId>,
    ) -> MetaResult<()> {
        for job_id in refresh_finished_table_job_ids {
            let table_id = job_id.as_mv_table_id();

            self.refresh_manager.mark_refresh_complete(table_id).await?;
        }

        Ok(())
    }
}

impl GlobalBarrierWorkerContextImpl {
    fn set_status(&self, new_status: BarrierManagerStatus) {
        self.status.store(Arc::new(new_status));
    }
}

impl CommandContext {
    /// Do some stuffs after barriers are collected and the new storage version is committed, for
    /// the given command.
    pub async fn post_collect(
        &self,
        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
    ) -> MetaResult<()> {
        let Some(command) = &self.command else {
            return Ok(());
        };
        match command {
            Command::Flush => {}

            Command::Throttle(_) => {}

            Command::Pause => {}

            Command::Resume => {}

            Command::SourceChangeSplit(SplitState {
                split_assignment: assignment,
                ..
            }) => {
                barrier_manager_context
                    .metadata_manager
                    .update_fragment_splits(assignment)
                    .await?;
            }

            Command::DropStreamingJobs {
                streaming_job_ids,
                unregistered_state_table_ids,
                ..
            } => {
                for job_id in streaming_job_ids {
                    barrier_manager_context
                        .refresh_manager
                        .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
                }

                barrier_manager_context
                    .hummock_manager
                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
                    .await?;
                barrier_manager_context
                    .metadata_manager
                    .catalog_controller
                    .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
                    .await;
            }
            Command::ConnectorPropsChange(obj_id_map_props) => {
                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
                barrier_manager_context
                    .source_manager
                    .apply_source_change(SourceChange::UpdateSourceProps {
                        source_id_map_new_props: obj_id_map_props
                            .iter()
                            .map(|(source_id, props)| (source_id.as_source_id(), props.clone()))
                            .collect(),
                    })
                    .await;
            }
            Command::CreateStreamingJob {
                info,
                job_type,
                cross_db_snapshot_backfill_info,
            } => {
                match job_type {
                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
                        barrier_manager_context
                            .metadata_manager
                            .catalog_controller
                            .fill_snapshot_backfill_epoch(
                                info.stream_job_fragments.fragments.iter().filter_map(
                                    |(fragment_id, fragment)| {
                                        if fragment.fragment_type_mask.contains(
                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
                                        ) {
                                            Some(*fragment_id as _)
                                        } else {
                                            None
                                        }
                                    },
                                ),
                                None,
                                cross_db_snapshot_backfill_info,
                            )
                            .await?
                    }
                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
                        barrier_manager_context
                            .metadata_manager
                            .catalog_controller
                            .fill_snapshot_backfill_epoch(
                                info.stream_job_fragments.fragments.iter().filter_map(
                                    |(fragment_id, fragment)| {
                                        if fragment.fragment_type_mask.contains_any([
                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
                                        ]) {
                                            Some(*fragment_id as _)
                                        } else {
                                            None
                                        }
                                    },
                                ),
                                Some(snapshot_backfill_info),
                                cross_db_snapshot_backfill_info,
                            )
                            .await?
                    }
                }

                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
                let CreateStreamingJobCommandInfo {
                    stream_job_fragments,
                    upstream_fragment_downstreams,
                    ..
                } = info;
                let new_sink_downstream =
                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
                        let new_downstreams = ctx.new_sink_downstream.clone();
                        let new_downstreams = FragmentDownstreamRelation::from([(
                            ctx.sink_fragment_id,
                            vec![new_downstreams],
                        )]);
                        Some(new_downstreams)
                    } else {
                        None
                    };

                barrier_manager_context
                    .metadata_manager
                    .catalog_controller
                    .post_collect_job_fragments(
                        stream_job_fragments.stream_job_id(),
                        upstream_fragment_downstreams,
                        new_sink_downstream,
                        Some(&info.init_split_assignment),
                    )
                    .await?;

                let source_change = SourceChange::CreateJob {
                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
                };

                barrier_manager_context
                    .source_manager
                    .apply_source_change(source_change)
                    .await;
            }
            Command::RescheduleFragment { reschedules, .. } => {
                let fragment_splits = reschedules
                    .iter()
                    .map(|(fragment_id, reschedule)| {
                        (*fragment_id, reschedule.actor_splits.clone())
                    })
                    .collect();

                barrier_manager_context
                    .metadata_manager
                    .update_fragment_splits(&fragment_splits)
                    .await?;
            }

            Command::ReplaceStreamJob(
                replace_plan @ ReplaceStreamJobPlan {
                    old_fragments,
                    new_fragments,
                    upstream_fragment_downstreams,
                    to_drop_state_table_ids,
                    auto_refresh_schema_sinks,
                    init_split_assignment,
                    ..
                },
            ) => {
                // Update actors and actor_dispatchers for new table fragments.
                barrier_manager_context
                    .metadata_manager
                    .catalog_controller
                    .post_collect_job_fragments(
                        new_fragments.stream_job_id,
                        upstream_fragment_downstreams,
                        None,
                        Some(init_split_assignment),
                    )
                    .await?;

                if let Some(sinks) = auto_refresh_schema_sinks {
                    for sink in sinks {
                        barrier_manager_context
                            .metadata_manager
                            .catalog_controller
                            .post_collect_job_fragments(
                                sink.tmp_sink_id.as_job_id(),
                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
                                None, // no replace plan
                                None, // no init split assignment
                            )
                            .await?;
                    }
                }

                // Apply the split changes in source manager.
                barrier_manager_context
                    .source_manager
                    .handle_replace_job(
                        old_fragments,
                        new_fragments.stream_source_fragments(),
                        replace_plan,
                    )
                    .await;
                barrier_manager_context
                    .hummock_manager
                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
                    .await?;
            }

            Command::CreateSubscription {
                subscription_id, ..
            } => {
                barrier_manager_context
                    .metadata_manager
                    .catalog_controller
                    .finish_create_subscription_catalog(*subscription_id)
                    .await?
            }
            Command::DropSubscription { .. } => {}
            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
            Command::StartFragmentBackfill { .. } => {}
            Command::ListFinish { .. } | Command::LoadFinish { .. } | Command::Refresh { .. } => {}
        }

        Ok(())
    }
}
